查看原文
其他

Streaming Data Warehouse 存储:需求与架构

李劲松@阿里 Apache Flink 2023-05-01

作者|Jingsong Lee jingsonglee0@gmail.com

一、数仓中的计算

在计算机领域,数据仓库(DW 或 DWH),是一个用于报告和数据分析的系统,被认为是商业智能的一个核心组成部分。它将当前和历史数据存储在一个地方,为整个企业的工作人员创建分析报告。[1] 


典型的基于提取、转换、加载(ETL)的数据仓库使用 ODS 层、DWD 层和 DWS 层来容纳其关键功能。数据分析师可以灵活的查询 (Query) 数仓中的每一层,获取有价值的商业信息。


数仓中有三个关键指标 [2]


  • 数据的新鲜度:数据从产生开始,到在仓库中经过一系列处理后可供用户查询所经过的时间长度。通常 ETL 就是用来准备数据的一系列过程,ETL 更多是通过调度运行一系列流计算或者批计算的作业来完成。


  • 数据的查询延时:数据准备好后,用户通过 Query 查询表中的数据,从用户发出查询到收到查询结果的时间长度为查询延时。查询延时直接决定了终端用户的体感。


  • 成本:完成一定量的数据分析(包括 ETL 和查询等各类计算)需要的资源量。成本也是数仓中的一个关键指标。


这三个指标的关系是什么呢?


  • 企业需要在控制成本的情况下,能达到更好的查询延时和新鲜度。不同的数据可能有不同的成本要求。


  • 新鲜度和查询延时在某些情况也是此消彼长的关系,比如使用更长时间来准备数据、清洗和预处理数据,查询会更快。


所以这三者构成了数仓中的一个三角 Tradeoff [2]

(注:三角中,离顶点更近代表更好,离顶点更远代表更差)


对于这个三角 Tradeoff,业界目前的主流架构有着怎么样的取舍呢?


二、业界主流架构


典型的离线数仓:



离线数仓使用 Batch ETL 基于分区粒度来覆写 (INSERT OVERWRITE),在解决超大数据的场景的同时,有着很好的成本控制。


但是它有两个比较严重的问题:


  1. 新鲜度差:数据延时一般是 T + 1,即业务上当天产生的数据需要第二天才能查询到。


  2. 不擅长处理更新流 (Changelog),离线数仓里面存储的都是 Append 数据,如果需要接收类似数据库变更日志的更新流,需要反复的合并全量数据和增量数据,成本激增。



为了解决上述问题,实时数仓逐渐兴起,一个典型的实时数仓实现是使用 Flink + Kafka 的方案构建中间层,最终写到在线数据库或分析系统中,达到秒级的全链路延时,有着非常好的数据新鲜度。


但是,它也逐渐暴露出一些问题。


问题一,中间层不可查


存在 Kafka 中的数据查询受限,无法灵活的进行 OLAP 查询,通常也没有保存长期历史数据。这与广泛使用的数仓有很大不同,在一个成熟的 Warehouse 体系中,数仓中的每一个数据集都应该是可供查询的 Table 抽象,而 Kafka 无法满足用户对于 Table 抽象的所有需求,比如说:


  1. 查询能力受限。实时数仓架构要求所有可供查询的数据集被预先计算,并且最终写入可供查询的分析系统,但实际业务中不是所有计算都可以预先定义的,数据分析师的大量需求是临时的 Ad hoc 查询,如果中间数据 Queue 不可查,这会严重限制业务的数据分析能力。


  2. 问题排查困难。实时数仓中,如果数据有问题,用户需要排查数据 Pipeline,但由于存储中间结果的 Queue 不可查,导致排查难度非常高。


综上,我们希望能有统一的架构得到一个处处可查询的实时数仓,而不是中间结果被管道化的数仓。


问题二,实时链路成本高


天下没有免费的午餐,搭建一条实时链路是比较昂贵的。


  1. 存储成本:不管是 Kafka 还是后面的 ADS 层,它们都是在线服务,虽然有很低的延时,但是有很高的存储成本。


  2. 迁移和维护成本:实时链路是与独立于离线的新的一套系统,并不兼容离线的一套工具链,迁移和维护成本都很大。


由此,我们希望能有一个低成本的实时数仓,它提供低运行成本并兼容离线工具链,同时加速原有的离线数仓。


总结:



离线数仓

实时数仓

成本


新鲜度


数仓中间表查询延时


无法查询

数仓结果表查询延时




因为当前的两套架构面向不同的取舍和场景,所以业务通常只能维护两套架构,甚至需要不同的技术团队,这不仅在带来了很大的资源成本,也带来了昂贵的开发成本和运维成本。


那么我们是不是有可能提供一个在新鲜度、查询延时、查询能力和成本等各方面比较均衡的数仓呢?为了回答这个问题,我们需要分析新鲜度和查询延时背后的技术原理,不同的 Tradeoff 导致的不同架构,以及它们背后的技术差异。


三、ETL 新鲜度


首先需要思考的是数据的新鲜度:数据的新鲜度衡量的是数据从产生开始,到在仓库中经过一系列处理后可供用户查询所经过的时间长度。数据被摄入到数仓里,并且经过一系列 ETL 的处理后,数据才进入可用的状态。


传统的批计算是按照口径来进行 ETL 计算的,所以它的新鲜度是:口径 + ETL延时。一般的口径是天,所以传统离线数仓的新鲜度最少也是一天。按照口径来计算,计算的输入和输出是全量的。如果新鲜度要小于口径,计算的输入和输出是部分的,也就是增量的。典型的增量计算就是流计算,比如 Flink Streaming。


增量计算也不完全等同于流计算,比如也可以有小批次的增量计算。全量计算不完全等同于批计算,比如流计算也可以做 Window 来全量的输出 (也就是说流计算的延迟也可以很大,这样可以降低成本);



四、Query Latency


查询延时会直接影响数据分析效率和体验,查询是返回给人看的,这个人不是机器人,他看到的数据是经过过滤或者聚合后的数据。在传统离线数仓中,查询大表往往可能需要 10+ 分钟的时间。


加速查询的返回最直观的方式是预计算,本质上数仓的 ETL 就是在做预计算的事情,数据分析人员查询的计算需要的时间太久时,他会通知数仓人员,建立对应的 ETL Pipeline,数据准备好后,分析人员直接查询最终结果表即可。从一个角度上看,这其实是在用新鲜度换取更快的查询延时。


但是在传统离线数仓中,有大量的即席查询(Ad Hoc),用户根据自己的需求,灵活的选择查询条件。有大表参与的查询往往可能需要 10+ 分钟的时间,为了尽快的返回结果,各大存储系统使用了各种各样的优化手段。


比如存储更靠近计算,越靠近计算,读取越快:


  • 一些 Message Queue 和 OLAP 系统,它们只提供本地磁盘的存储,这保证了读取性能,但是也牺牲了灵活性,扩容和迁移代价比较大,成本也更高。


  • 另一个方向是计算存储分离的架构,数据全在远程,但是通过本地的 Cache,来减小远程访问 DFS /Object Store 的高昂代价。


比如 Data Skipping,结合查询的条件和字段,跳过不相关的数据来加速数据的查找:


  • Hive:通过分区裁剪查询特定分区,通过列存跳过不相关的字段。


  • 湖存储:在使用列存的基础上,引入文件的统计信息,根据文件的统计信息来尽量减少一些文件的不必要读取。


  • OLAP 系统:在使用列存的基础上,比如使用 LSM 结构来让数据尽可能按照主键有序,有序是最利于查询的结构之一,比如 Clickhouse。


  • KV 系统:通过数据的组织结构,使用 LSM 的结构来加速查询。


  • Message Queue:Queue 其实是通过一种特殊的读取接口来达到快速定位数据的能力,它只提供基于 Offset / Timestamp 的定位方式来接着增量读取数据。


还有很多优化手段,这里不一一枚举了,存储通过各种手段来配合计算加速查询,让查询找得快、读得快。


通过上述的分析,我们可以看到,不同系统底层的技术基本都是相通的:


  • 流计算和批计算是计算的不同模式,它们都可以完成全量计算或者增量计算。


  • 存储加速查询性能的手段都是围绕着找得快和读得快,底层的原理是相通的。


理论上来说,我们应该有可能通过底层技术的某种选择和组合搭建某种架构,来达到我们想要的 Tradeoff。这个统一的架构可能需要根据不同的 Tradeoff 解决以下场景:


  • 实时数仓:新鲜度很好。


  • 近实时数仓:作为离线数仓的加速,在不带来太高昂的成本情况下,提高新鲜度。


  • 离线数仓:有着比较好的成本控制。


  • 离线 OLAP:加速数仓的某一部分的查询性能,比如 ADS 表。


Streaming Warehouse 的目标是成为一个统一的架构:

(注:三角中,离顶点更近代表更好,离顶点更远代表更差)


一个理想的数仓应该是用户可以随意调整成本、新鲜度、查询延时之间的 Tradeoff,这要求数仓能完全覆盖离线数仓、实时数仓、OLAP 的全部能力。Streaming Data Warehouse 在实时数仓的基础上往前走了一步,大幅降低了实时数仓的成本。


Streaming DW 在提供实时计算能力的同时,可以让用户在同套架构下覆盖离线数仓的能力。用户可以根据业务的需求作出相应的 Tradeoff,解决不同场景的问题。


五、Streaming Data Warehouse


在具体看 Streaming Data Warehouse 的存储架构是如何设计之前,我们先来回顾一下之前提到的主流实时数仓的两个问题。解决了这两个问题,Streaming Data Warehouse 的架构设计也就呼之欲出了。


5.1 中间数据不可查


既然中间的 Kafka 存储不可查,一个实时离线一体化的想法是:实时离线一比一双跑,业务层去做尽可能多的封装,尽量让用户看到一套表的抽象。



许多用户都会使用 Flink 加 Kafka 做实时数据流处理,将分析结果写入在线服务层对用户进行展示或进一步分析,与此同时将实时数仓中 Kafka 的数据导入到后台的异步离线数仓,对实时数据进行补充,每天定期大规模的批量运行/全量运行或对历史数据定期修正。[3]


但这个架构存在着几个问题:


  • Table 的抽象不同:采用不同的技术栈,实时链路跟离线链路有两套 Table 抽象,不但增加了开发成本,而且降低了开发效率;业务层尽可能的去封装,但是总会遇到各种磕磕碰碰的问题,有不少不对齐的坑。


  • 实时数仓和离线数仓的数据口径难以保持天然的一致性;



在 Streaming Data Warehouse 中,我们希望数仓有面向查询统一的 Table 抽象,所有流动中的数据皆可分析,没有数据盲点。这就要求这个统一的 Table 抽象能够同时支持两种能力:


  • Message Queue


  • OLAP 查询


也就是说在同一个 Table 上,用户可以以消息队列的方式订阅这个 Table 上的 Change Log,也可以对这个 Table 直接进行 OLAP 查询。


下面我们再来看经典实时数仓的第二个问题。


5.2 实时链路成本高


虽然 Streaming Data Warehouse 提供的统一 Table 抽象能够很好的解决新鲜度和查询延迟的问题,但相较于离线数仓其成本是更高的。在很多时候并非所有的业务场景都对新鲜度和查询延时有很高的要求,因此提供低成本 Table 存储能力依然是必要的。


这里湖存储是一个不错的选择:


  • 湖存储的存储成本更低:湖存储基于 DFS / Object Store,无 Service,资源和运维成本都更低。


  • 湖存储的局部更新灵活:历史分区有问题怎么办?需要订正怎么办?湖存储的计算成本更低,湖存储 + 离线 ETL,INSERT OVERWRITE 订正历史分区,比实时更新成本低很多。


  • 湖存储的开放性:湖存储可以开放给各种批计算引擎。


因此,Streaming Data Warehouse在保持全链路数据实时流动的同时,还需要同时提供低成本的离线存储,并且做到架构不影响实时链路。由于通常来说实时链路的 SLA 要求比离线链路要高,因此 Streaming Data Warehouse 的存储在设计和实现上要把 Queue 的写入和消费作为高优先级,对历史数据的存储不应该影响其作为 Queue 的能力。


六、Flink Table Store


Flink Table Store [4] 正是专门为 Streaming Warehouse 打造的流批一体存储。


在过去的几年里,在我们众多贡献者和用户的帮助下,Apache Flink 已经成为了最好的分布式计算引擎之一,特别是在大规模的有状态流处理方面。尽管如此,当大家试图从数据中实时获得洞察力时,仍然面临着一些挑战。在这些挑战中,一个突出的问题是缺乏能满足所有计算模式的存储。


到现在为止,人们为不同的目的部署一些与 Flink 一起协同的存储系统是很常见的。一个典型的做法是部署一个用于流处理的消息队列,一个用于批处理和 Ad-Hoc 查询的可扫描文件系统/对象存储,以及一个用于点查的 KV 存储。由于其复杂性和异构性,这样的架构在数据质量和系统维护方面都存在挑战。这已经成为了一个损害 Apache Flink 带来的流和批处理统一的端到端用户体验的主要问题。


Flink Table Store 的目标就是要解决上述问题。这对 Flink 来说是重要的一步,它将 Flink 的能力从计算领域扩展到了存储领域。也正因为这样,我们可以为用户提供更好的端到端体验。


6.1 架构


■ 6.1.1 Service


Coordinator 是集群的管控节点,它主要负责管理各 Executors,主要能力有:


  • Coordinator 管理 Executors 的生命周期,客户端通过 Coordinator 寻找 Executors 的地址。


  • Data Manager:

    • 管理 Table 的版本,负责与 Metastore 打交道,定期将版本 checkpoint 到 metastore 里。

    • 根据写入的数据和查询的 Pattern,管理缓存、管理索引。


  • Resource Manager:

    • 管理 Table 的 Buckets 在 Executors 的分布。

    • 根据需要动态的分配 Buckets 到 Executors 上。


Metastore 是个抽象的节点,它可以对接 Hive Metastore,也可以最小化依赖基于 Filesytem,也可以对接你自己的 Metastore,它保存了最基本的表信息。你不用担心性能问题,更详细的复杂的表信息放在了湖存储里。


Executor 是一个单独的计算节点,作为存储的一个 Cache 和本地计算的加速单元:


  • 它负责接收数据的更新,写入本地 Cache、写入本地磁盘、再 Flush 到底层的 DFS 中。


  • 它也面向实时的 OLAP 查询和 Queue 的消费,执行一些加速的本地计算。


每个 Executor 负责一个或多个 Buckets,每个 Bucket 有对应的 Changelog,这些 Changelog 会保存在 Message Queue 里,主要用作:


  • Write ahead log,Executor Failover 后读取 log 来恢复数据。


  • 提供 Queue 的抽象,提供 Table 的 Changelog 流消费给下游的流计算。


■ 6.1.2 湖存储


Executor 的数据经过了 Checkpoint 后就落入了湖存储中,湖存储建立在列存的文件格式和共享 DFS 存储上。湖存储提供完整的 Table Format 抽象,它的主要目的是以较低的成本支撑更新和读取:


  • LSM 结构:用于大量的数据更新和高性能的查询。


  • Columnar File Format:使用 Apache ORC 来支持高效查询。


  • Lake Storage:元数据和数据在 DFS 和 Object Store 上。


■ 6.1.3 冷热分离


存储的读写路径被分为了两条:


  • Streaming Pipeline & Online OLAP Query:通过 Coordinator 获取元数据,从 Executor 里写入和获取数据。


  • Batch Pipeline & Offline Query:通过 Metastore 获取元数据,从湖存储中写入和获取数据。


Service 的数据是最新的,经过了分钟级的 Checkpoint 后同步到了湖存储中。所以用户读取湖存储只会读取到没那么及时的数据,本质上,两边的数据是一致的。


Service 和湖存储的使用有这些区别:


  • Service 适合最新的热数据,提供快速的逐条 Update 写入,高性能的查询延时。


  • Service 不适合 Offline Query,一个是影响 Online 稳定性,另一个是成本会更高。


  • Service 不支持 Batch Pipeline 的 INSERT OVERWRITE。


所以存储需要暴露湖存储来承担这些能力,那业务上如何判断哪些数据是在 Service 里操作,哪些数据在湖存储里操作呢?


只有 ARCHIVE 后的分区才能在湖存储中进行 Batch 的 INSERT OVERWRITE。


  • 用户在创建表时可以指定分区自动 ARCHIVE 时间。


  • 也可以通过 DDL 语句显式的归档某个分区。


6.2 短期目标


■ 6.2.1 短期架构


Streaming Data Warehouse 的整体变革是巨大的,OLAP、Queue、湖存储、流计算、批计算,每一个领域都有佼佼者在其中发力,今天还不可能短期内产出一个完整的解决方案。


但是,我们在前进,在 Apache Flink Table Store 中,我们首先开发了基于 LSM 的湖存储,并原生集成了 Kafka 作为 Log System。



相比于上述章节的完整架构,短期的架构没有 Coordinator 和 Excutors,这意味着它:


  • 不能提供实时 OLAP 的能力,基于文件的 OLAP 只能是准实时的延时。


  • 没有服务化数据管控的能力。


我们希望从底层做起,夯实基础,首先提出一个完整的统一抽象,再在存储上做加速能力,再提供真实时的 OLAP。


目前的架构它提供两个核心价值:


■ 6.2.2 价值一:实时中间层可查


Table Store 给原有实时数仓的 Kafka 分层存储带来查询的能力,中间数据可查;


Table Store 仍然具有流式实时 Pipeline 的能力,它原生 Log 集成,支持集成 Kafka,屏蔽掉流批的概念,用户只看到 Table 的抽象。


但是值得注意的是,数据写入存储不应该影响原有写入 Kafka 的稳定性,这点是需要加强和保证的。


■ 6.2.3 价值二:离线数仓加速


Table Store 加速离线数仓,兼容 Hive 离线数仓的同时,提供增量更新的能力。


Table Store 提供完善的湖存储 Table Format,提供准实时 OLAP 查询,LSM 的结构不但有利于更新性能,也可以有更好的 Data Skipping,加速 OLAP 查询。


6.3 后续计划


社区目前正在努力加强核心功能,稳定存储格式,并补全剩余的部分,使 Flink Table Store 为生产做好准备。


在即将发布的 0.2.0 版本中,我们希望可以提供流批一体的 Table Format,逐渐完善的流批一体湖存储,你可以期待(至少)以下的额外功能:


  • 支持 Apache Hive 引擎的 Flink Table Store Reader。


  • 支持调整 Bucket 的数量。


  • 支持 Append Only 数据,Table Store 不只是限于更新场景。


  • 完整的 Schema Evolution,更好的元数据管理。


  • 根据 0.1.0 预览版的反馈进行改进。


在中期,你也可以期待:


  • 支持 Presto、Trino 和 Apache Spark 的 Flink Table Store Reader。


  • Flink Table Store Service,以加速更新和提高查询性能,拥有毫秒级的 Streaming Pipeline 能力,和较强的 OLAP 能力。


请试一试 0.1.0 预览版,在 Flink 邮件列表中分享您的反馈,并为项目作出贡献。


6.4 项目信息


Apache Flink Table Store 项目[4] 正在开发中,目前已经发布了第一个版本,欢迎大家试用和反馈。


欢迎大家加入下面的讨论群,一起讨论业务的需求和后续的发展:


[1] Data warehouse Wiki: https://en.wikipedia.org/wiki/Data_warehouse

[2] Napa: Powering Scalable Data Warehousing with Robust Query Performance at Google: http://vldb.org/pvldb/vol14/p2986-sankaranarayanan.pdf

[3] Flink Next:Beyond Stream Processing: https://mp.weixin.qq.com/s/CxHzYGf2dg8amivPJzLTPQ

[4] https://github.com/apache/flink-table-store

往期精选




▼ 关注「Apache Flink」,获取更多技术干货 ▼
更多 Flink 相关技术问题,可扫码加入社区钉钉交流群~
   进入 Flink 中文学习网站,查看更多技术内容~

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存